package com.bw.gmall.realtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bw.gmall.realtime.bean.CartAddUuBean;
import com.bw.gmall.realtime.utils.DateFormatUtil;
import com.bw.gmall.realtime.utils.MyClickHouseUtil;
import com.bw.gmall.realtime.utils.MyKafkaUtil;
import com.sun.xml.internal.bind.v2.TODO;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class DwsTradeCartAddUuWindow {
    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        //TODO 2.读取 Kafka DWD层 加购事实表
        String topic = "dwd_trade_cart_add";
        String groupId = "dws_trade_cart_add_uu_window_211126";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));

        //TODO 3.将数据转换为JSON对象
        SingleOutputStreamOperator<JSONObject> jsonObjDs = kafkaDS.map(JSON::parseObject);

        //TODO 4.提取事件时间生成Watermark
        SingleOutputStreamOperator<JSONObject> jsonObjWmDs = jsonObjDs.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                    @Override
                    public long extractTimestamp(JSONObject jsonObject, long l) {
                        String operateTime = jsonObject.getString("operate_time");


                        if (operateTime != null){
                            return DateFormatUtil.toTs(operateTime,true);
                        }else {
                            return DateFormatUtil.toTs(jsonObject.getString("create_time"),true);
                        }


                    }
                }));

        //TODO 5.按照user_id分组
        KeyedStream<JSONObject, String> keyedStream = jsonObjWmDs.keyBy(o -> o.getString("user_id"));
        //TODO 6.使用状态编程提取独立加购用户
        SingleOutputStreamOperator<CartAddUuBean> cartAddDs = keyedStream.flatMap(new RichFlatMapFunction<JSONObject, CartAddUuBean>() {

            private ValueState<String> cart;

            public void open(Configuration par) {
                StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.days(1))
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .build();

                ValueStateDescriptor<String> state = new ValueStateDescriptor<>("cart", String.class);
                state.enableTimeToLive(ttlConfig);

                cart = getRuntimeContext().getState(state);

            }


            @Override
            public void flatMap(JSONObject jsonObject, Collector<CartAddUuBean> collector) throws Exception {
                String lastDt = cart.value();

                String operateTime = jsonObject.getString("operate_time");
                String curDt = null;
                if (operateTime != null) {
                    curDt = operateTime.split(" ")[0];
                } else {
                    String createTime = jsonObject.getString("create_time");
                    curDt = createTime.split(" ")[0];
                }

                if (lastDt == null || !lastDt.equals(curDt)) {
                    cart.update(curDt);
                    collector.collect(new CartAddUuBean(
                            "",
                            "",
                            1L,
                            null));
                }
            }
        });
        //TODO 7.开窗、聚合
        SingleOutputStreamOperator<CartAddUuBean> resDs = cartAddDs.windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
                .reduce(new ReduceFunction<CartAddUuBean>() {
                    @Override
                    public CartAddUuBean reduce(CartAddUuBean t1, CartAddUuBean t2) throws Exception {

                        t1.setCartAddUuCt(t1.getCartAddUuCt() + t2.getCartAddUuCt());

                        return t1;
                    }
                }, new AllWindowFunction<CartAddUuBean, CartAddUuBean, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow timeWindow, Iterable<CartAddUuBean> iterable, Collector<CartAddUuBean> collector) throws Exception {
                        CartAddUuBean next = iterable.iterator().next();

                        next.setStt(DateFormatUtil.toYmdHms(timeWindow.getStart()));
                        next.setEdt(DateFormatUtil.toYmdHms(timeWindow.getEnd()));
                        next.setTs(System.currentTimeMillis());

                        collector.collect(next);
                    }
                });
        //TODO 8.将数据写出到ClickHouse
        resDs.print(">>>>>>>>>>>");
        resDs.addSink(MyClickHouseUtil.getSinkFunction("insert into dws_trade_cart_add_uu_window values (?,?,?,?)"));


        env.execute();
    }
}
